﻿using System.Collections.Concurrent;
using System.Threading.Tasks.Dataflow;

namespace SimpleScheduler
{

#if NET6_0_OR_GREATER

    internal sealed class DelayQueue : IDelayQueue
    {
        private readonly PeriodicTimer _checkTimer;
        private readonly PeriodicTimer _queueTimer;
        private readonly ConcurrentDictionary<DelayQueueKey, IList<TaskItem>> _queueItems;
        private readonly SortedDictionary<ulong, Queue<DelayQueueKey>> _queueSortValues;
        private readonly ActionBlock<DelayQueueKey> _plusTimerCount;
        private readonly BufferBlock<Queue<DelayQueueKey>> _queueBuffer;
        private readonly ActionBlock<Queue<DelayQueueKey>> _queueAction;
        private readonly ActionBlock<ulong> _timeFinishAction;
        private readonly ActionBlock<(DelayQueueKey key, TaskItem queueItem)> _retryAction;
        private readonly string _queueName;
        private readonly TimeSpan _delayAccuracy;
        private readonly bool _enableRetry;
        private readonly ushort _retryCount;
        private readonly bool _shareQuery = false;
        private readonly Func<DelayQueueKey, bool> _shareExistFunc;

        private CancellationTokenSource CancelToken { get; set; }

        private QueueOptions _queueOptions;
        /// <summary>
        /// 暂停计时
        /// </summary>
        volatile bool PauseCount;
        volatile bool IsStarted;
        ulong MinQueueKey;
        ulong _timerCount = ulong.MinValue;

        internal DelayQueue(string queueName, QueueOptions opts, Func<DelayQueueKey, bool> existFunc) : this(queueName, opts)
        {
            _shareExistFunc = existFunc;
        }

        internal DelayQueue(string queueName, QueueOptions opts)
        {
            _queueName = queueName ?? Constants.DefaultQueueName;
            _queueOptions = opts;
            _enableRetry = opts?.EnableRetry == true;
            _shareQuery = opts?.ShareExists == true;
            _retryCount = opts?.RetryMax ?? 1;
            _retryCount = (ushort)(_retryCount < 1 ? 1 : _retryCount > 10 ? 10 : _retryCount);
            _delayAccuracy = GetDelayAccuracy(opts?.DelayAccuracy);
            int max = opts?.QueueHandledConcurrency ?? 1;
            _queueBuffer = new BufferBlock<Queue<DelayQueueKey>>();
            _queueAction = new ActionBlock<Queue<DelayQueueKey>>(QueueBlockAction, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = max });
            _queueBuffer.LinkTo(_queueAction);
            _queueItems = new ConcurrentDictionary<DelayQueueKey, IList<TaskItem>>();
            _queueSortValues = new SortedDictionary<ulong, Queue<DelayQueueKey>>();
            _plusTimerCount = new ActionBlock<DelayQueueKey>(SetTimerCount, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            _timeFinishAction = new ActionBlock<ulong>(TimeFinished, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            _retryAction = new ActionBlock<(DelayQueueKey key, TaskItem queueItem)>(AddRetry, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            CancelToken = new CancellationTokenSource();

            _checkTimer = new PeriodicTimer(TimeSpan.FromMinutes(10));
            _queueTimer = new PeriodicTimer(_delayAccuracy);
            StartCheck();
        }

        async Task StartCheck()
        {
            while (await _checkTimer.WaitForNextTickAsync())
            {
                if (_queueSortValues.Count < 1)
                {
                    _timerCount = ulong.MinValue;
                }
            }
        }

        async Task StartTimer()
        {
            if (IsStarted)
            {
                return;
            }
            CancelToken = new CancellationTokenSource();
            IsStarted = true;
            while (await _queueTimer.WaitForNextTickAsync(CancelToken.Token))
            {
                if (PauseCount)
                {
                    continue;
                }
                _timerCount++;
                if (MinQueueKey > ulong.MinValue && _timerCount == MinQueueKey)
                {
                    await _timeFinishAction.SendAsync(MinQueueKey);
                }
                MinQueueKey = _queueSortValues.Count < 1 ? ulong.MinValue : _queueSortValues.Keys.ElementAt(0);
            }
            IsStarted = false;
        }

        void TimeFinished(ulong arg)
        {
            if (arg == ulong.MinValue)
            {
                return;
            }
            if (_queueSortValues.TryGetValue(arg, out Queue<DelayQueueKey> value))
            {
                _queueBuffer.Post(value);
                _queueSortValues.Remove(arg);
            }
        }

        void SetTimerCount(DelayQueueKey arg)
        {
            PauseCount = true;
            try
            {
                double delayVal = arg.DelayTime / _delayAccuracy;
                ulong minCount = delayVal < 1 ? 1 : (ulong)Math.Ceiling(delayVal);
                Queue <DelayQueueKey> queue;
                if (_queueSortValues.Count < 1)
                {
                    queue = new Queue<DelayQueueKey>();
                    queue.Enqueue(arg);
                    _queueSortValues.TryAdd(minCount, queue);
                    //重置计时标记
                    _timerCount = ulong.MinValue;
                }
                else
                {
                    minCount = _timerCount + minCount;
                    if (_queueSortValues.TryGetValue(minCount, out queue))
                    {
                        queue.Enqueue(arg);
                    }
                    else
                    {
                        queue = new Queue<DelayQueueKey>();
                        queue.Enqueue(arg);
                        _queueSortValues.TryAdd(minCount, queue);
                    }
                }
                MinQueueKey = _queueSortValues.Keys.ElementAt(0);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                PauseCount = false;
            }
        }

        TimeSpan GetDelayAccuracy(QueueDelayAccuracy? level)
        {
            return level switch
            {
                QueueDelayAccuracy.Minute => TimeSpan.FromMinutes(1),
                QueueDelayAccuracy.Milliseconds => TimeSpan.FromMilliseconds(1),
                _ => TimeSpan.FromSeconds(1),
            };
        }

        async Task QueueBlockAction(Queue<DelayQueueKey> arg)
        {
            while (arg?.Count > 0)
            {
                if (!arg.TryDequeue(out DelayQueueKey key))
                {
                    return;
                }
                if (_queueItems.TryRemove(key, out IList<TaskItem> taskQueue))
                {
                    Parallel.ForEach(taskQueue, (tq) =>
                    {
                        if (tq == null || tq.IsCancellation)
                        {
                            return;
                        }
                        bool? result = tq.ExecuteAction();
                        if (_enableRetry && tq.EnableRetry && result != true)
                        {
                            _retryAction.SendAsync(ValueTuple.Create(key, tq));
                        }
                    });
                }
            }
        }

        async Task AddRetry((DelayQueueKey key, TaskItem queueItem) item)
        {
            if (item.key.RetryCount >= _retryCount)
            {
                return;
            }
            item.key.RetryCount += 1;
            await _plusTimerCount.SendAsync(item.key);
            IList<TaskItem> queueModel;
            if (_queueItems.TryGetValue(item.key, out queueModel))
            {
                queueModel.Add(item.queueItem);
            }
            else
            {
                queueModel = new List<TaskItem>() { item.queueItem };
                _queueItems.TryAdd(item.key, queueModel);
            }
        }

        public async ValueTask<bool> Enqueue(QueueItem callback)
        {
            if (callback == null)
            {
                return false;
            }
            callback.QueueName = string.IsNullOrWhiteSpace(callback.QueueName) ? _queueName : callback.QueueName;
            if (!string.Equals(_queueName, callback.QueueName))
            {
                return false;
            }
            var key = new DelayQueueKey
            {
                QueueName = callback.QueueName,
                TaskName = callback.TaskName,
                DelayTime = callback.DelayTime,
            };
            await _plusTimerCount.SendAsync(key);
            IList<TaskItem> queueModel;
            if (_queueItems.TryGetValue(key, out queueModel))
            {
                queueModel.Add(callback);
                return true;
            }
            else
            {
                queueModel = new List<TaskItem>() { callback };
                return _queueItems.TryAdd(key, queueModel);
            }
        }

        public async ValueTask<bool> Enqueue<T>(QueueItem<T> data)
        {
            if (data == null)
            {
                return false;
            }
            data.QueueName = string.IsNullOrWhiteSpace(data.QueueName) ? _queueName : data.QueueName;
            if (!string.Equals(_queueName, data.QueueName))
            {
                return false;
            }
            var key = new DelayQueueKey
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            await _plusTimerCount.SendAsync(key);
            IList<TaskItem> queueModel;
            if (_queueItems.TryGetValue(key, out queueModel))
            {
                queueModel.Add(data);
                return true;
            }
            else
            {
                queueModel = new List<TaskItem>() { data };
                return _queueItems.TryAdd(key, queueModel);
            }
        }

        public bool Exist(QueueItem data)
        {
            var key = new DelayQueueKey()
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            if (_queueItems.ContainsKey(key))
            {
                return true;
            }
            if (_shareQuery && _shareExistFunc != null)
            {
                return _shareExistFunc.Invoke(key);
            }
            return false;
        }

        public bool Exist<T>(QueueItem<T> data)
        {
            var key = new DelayQueueKey()
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            if (_queueItems.ContainsKey(key))
            {
                return true;
            }
            if (_shareQuery && _shareExistFunc != null)
            {
                return _shareExistFunc.Invoke(key);
            }
            return false;
        }

        internal bool Exist(DelayQueueKey key)
        {
            return _queueItems.ContainsKey(key);
        }

        public void Start()
        {
            StartTimer();
            Console.WriteLine("Started: {0}", DateTime.Now);
        }

        public void Stop()
        {
            if (!IsStarted)
            {
                return;
            }
            CancelToken?.Cancel(false);
            IsStarted = false;
        }

        public async void Clear()
        {
            _queueItems.Clear();
            while (PauseCount)
            {
                await Task.Delay(100);
            }
            _queueSortValues.Clear();
        }
    }


#else

    internal sealed class DelayQueue : IDelayQueue
    {
        private readonly System.Timers.Timer _checkTimer;
        private readonly System.Timers.Timer _queueTimer;
        private readonly ConcurrentDictionary<DelayQueueKey, IList<BaseQueueItem>> _queueItems;
        private readonly SortedDictionary<ulong, Queue<DelayQueueKey>> _queueSortValues;
        private readonly ActionBlock<DelayQueueKey> _plusTimerCount;
        private readonly BufferBlock<Queue<DelayQueueKey>> _queueBuffer;
        private readonly ActionBlock<Queue<DelayQueueKey>> _queueAction;
        private readonly ActionBlock<ulong> _timeFinishAction;
        private readonly ActionBlock<(DelayQueueKey key, BaseQueueItem queueItem)> _retryAction;
        private readonly string _queueName;
        private readonly TimeSpan _delayAccuracy;
        private readonly bool _enableRetry;
        private readonly ushort _retryCount;
        private readonly bool _shareQuery = false;
        private readonly Func<DelayQueueKey, bool> _shareExistFunc;

        private CancellationTokenSource CancelToken { get; set; }

        private QueueOptions _queueOptions;
        /// <summary>
        /// 暂停计时
        /// </summary>
        volatile bool PauseCount;
        volatile bool IsStarted;
        ulong MinQueueKey;
        ulong _timerCount = ulong.MinValue;

        internal DelayQueue(string queueName, QueueOptions opts, Func<DelayQueueKey, bool> existFunc) : this(queueName, opts)
        {
            _shareExistFunc = existFunc;
        }

        internal DelayQueue(string queueName, QueueOptions opts)
        {
            _queueName = queueName ?? Constants.DefaultQueueName;
            _queueOptions = opts;
            _enableRetry = opts?.EnableRetry == true;
            _shareQuery = opts?.ShareExists == true;
            _retryCount = opts?.RetryMax ?? 1;
            _retryCount = (ushort)(_retryCount < 1 ? 1 : _retryCount > 10 ? 10 : _retryCount);
            _delayAccuracy = GetDelayAccuracy(opts?.DelayAccuracy);
            int max = opts?.QueueHandledConcurrency ?? 1;
            _queueBuffer = new BufferBlock<Queue<DelayQueueKey>>();
            _queueAction = new ActionBlock<Queue<DelayQueueKey>>(QueueBlockAction, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = max });
            _queueBuffer.LinkTo(_queueAction);
            _queueItems = new ConcurrentDictionary<DelayQueueKey, IList<BaseQueueItem>>();
            _queueSortValues = new SortedDictionary<ulong, Queue<DelayQueueKey>>();
            _plusTimerCount = new ActionBlock<DelayQueueKey>(SetTimerCount, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            _timeFinishAction = new ActionBlock<ulong>(TimeFinished, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            _retryAction = new ActionBlock<(DelayQueueKey key, BaseQueueItem queueItem)>(AddRetry, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
            CancelToken = new CancellationTokenSource();

            _checkTimer = new System.Timers.Timer();
            _checkTimer.Interval = TimeSpan.FromMinutes(10).TotalMilliseconds;
            _checkTimer.Elapsed += _checkTimer_Elapsed;
            _queueTimer = new System.Timers.Timer();
            _queueTimer.Interval = _delayAccuracy.TotalMilliseconds;
            _queueTimer.Elapsed += _queueTimer_Elapsed;
            StartCheck();
        }

        async Task StartCheck()
        {
            _checkTimer.Start();
        }

        async Task StartTimer()
        {
            if (IsStarted)
            {
                return;
            }
            CancelToken = new CancellationTokenSource();
            IsStarted = true;
            _queueTimer.Start();
            IsStarted = false;
        }
        void _checkTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            if (_queueSortValues.Count < 1)
            {
                _timerCount = ulong.MinValue;
            }
        }
        void _queueTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            if (PauseCount)
            {
                return;
            }
            _timerCount++;
            if (MinQueueKey > ulong.MinValue && _timerCount == MinQueueKey)
            {
                _timeFinishAction.SendAsync(MinQueueKey);
            }
            MinQueueKey = _queueSortValues.Count < 1 ? ulong.MinValue : _queueSortValues.Keys.ElementAt(0);
        }

        void TimeFinished(ulong arg)
        {
            if (arg == ulong.MinValue)
            {
                return;
            }
            if (_queueSortValues.TryGetValue(arg, out Queue<DelayQueueKey> value))
            {
                _queueBuffer.Post(value);
                _queueSortValues.Remove(arg);
            }
        }

        void SetTimerCount(DelayQueueKey arg)
        {
            PauseCount = true;
            try
            {
                double delayVal = arg.DelayTime.TotalMilliseconds / _delayAccuracy.TotalMilliseconds;
                ulong minCount = delayVal < 1 ? 1 : (ulong)Math.Ceiling(delayVal);
                Queue<DelayQueueKey> queue;
                if (_queueSortValues.Count < 1)
                {
                    queue = new Queue<DelayQueueKey>();
                    queue.Enqueue(arg);
                    _queueSortValues.Add(minCount, queue);
                    //重置计时标记
                    _timerCount = ulong.MinValue;
                }
                else
                {
                    minCount = _timerCount + minCount;
                    if (_queueSortValues.TryGetValue(minCount, out queue))
                    {
                        queue.Enqueue(arg);
                    }
                    else
                    {
                        queue = new Queue<DelayQueueKey>();
                        queue.Enqueue(arg);
                        _queueSortValues.Add(minCount, queue);
                    }
                }
                MinQueueKey = _queueSortValues.Keys.ElementAt(0);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                PauseCount = false;
            }
        }

        TimeSpan GetDelayAccuracy(QueueDelayAccuracy? level)
        {
            switch (level)
            {
                case QueueDelayAccuracy.Minute:
                    return TimeSpan.FromMinutes(1);
                case QueueDelayAccuracy.Milliseconds:
                    return TimeSpan.FromMilliseconds(1);

                default:
                case QueueDelayAccuracy.Second:
                    return TimeSpan.FromSeconds(1);
            }
        }

        async Task QueueBlockAction(Queue<DelayQueueKey> arg)
        {
            while (arg?.Count > 0)
            {
                var key = arg.Dequeue();
                if (_queueItems.TryRemove(key, out IList<BaseQueueItem> taskQueue))
                {
                    Parallel.ForEach(taskQueue, (tq) =>
                    {
                        if (tq.IsCancellation)
                        {
                            return;
                        }
                        bool? result = tq.QueueAction();
                        if (_enableRetry && tq.EnableRetry && result != true)
                        {
                            _retryAction.SendAsync(ValueTuple.Create(key, tq));
                        }
                    });
                }
            }
        }

        async Task AddRetry((DelayQueueKey key, BaseQueueItem queueItem) item)
        {
            if (item.key.RetryCount >= _retryCount)
            {
                return;
            }
            item.key.RetryCount += 1;
            await _plusTimerCount.SendAsync(item.key);
            IList<BaseQueueItem> queueModel;
            if (_queueItems.TryGetValue(item.key, out queueModel))
            {
                queueModel.Add(item.queueItem);
            }
            else
            {
                queueModel = new List<BaseQueueItem>() { item.queueItem };
                _queueItems.TryAdd(item.key, queueModel);
            }
        }

        public async Task<bool> Enqueue(QueueItem callback)
        {
            if (callback == null)
            {
                return false;
            }
            callback.QueueName = string.IsNullOrWhiteSpace(callback.QueueName) ? _queueName : callback.QueueName;
            if (!string.Equals(_queueName, callback.QueueName))
            {
                return false;
            }
            var key = new DelayQueueKey
            {
                QueueName = callback.QueueName,
                TaskName = callback.TaskName,
                DelayTime = callback.DelayTime,
            };
            await _plusTimerCount.SendAsync(key);
            IList<BaseQueueItem> queueModel;
            if (_queueItems.TryGetValue(key, out queueModel))
            {
                queueModel.Add(callback);
                return true;
            }
            else
            {
                queueModel = new List<BaseQueueItem>() { callback };
                return _queueItems.TryAdd(key, queueModel);
            }
        }

        public async Task<bool> Enqueue<T>(QueueItem<T> data)
        {
            if (data == null)
            {
                return false;
            }
            data.QueueName = string.IsNullOrWhiteSpace(data.QueueName) ? _queueName : data.QueueName;
            if (!string.Equals(_queueName, data.QueueName))
            {
                return false;
            }
            var key = new DelayQueueKey
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            await _plusTimerCount.SendAsync(key);
            IList<BaseQueueItem> queueModel;
            if (_queueItems.TryGetValue(key, out queueModel))
            {
                queueModel.Add(data);
                return true;
            }
            else
            {
                queueModel = new List<BaseQueueItem>() { data };
                return _queueItems.TryAdd(key, queueModel);
            }
        }

        public bool Exist(QueueItem data)
        {
            var key = new DelayQueueKey()
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            if (_queueItems.ContainsKey(key))
            {
                return true;
            }
            if (_shareQuery && _shareExistFunc != null)
            {
                return _shareExistFunc.Invoke(key);
            }
            return false;
        }

        public bool Exist<T>(QueueItem<T> data)
        {
            var key = new DelayQueueKey()
            {
                QueueName = data.QueueName,
                TaskName = data.TaskName,
                DelayTime = data.DelayTime,
            };
            if (_queueItems.ContainsKey(key))
            {
                return true;
            }
            if (_shareQuery && _shareExistFunc != null)
            {
                return _shareExistFunc.Invoke(key);
            }
            return false;
        }

        internal bool Exist(DelayQueueKey key)
        {
            return _queueItems.ContainsKey(key);
        }

        public void Start()
        {
            StartTimer();
            Console.WriteLine("Started: {0}", DateTime.Now);
        }

        public void Stop()
        {
            if (!IsStarted)
            {
                return;
            }
            CancelToken?.Cancel(false);
            IsStarted = false;
            _queueTimer.Stop();
        }

        public async void Clear()
        {
            _queueItems.Clear();
            while (PauseCount)
            {
                await Task.Delay(100);
            }
            _queueSortValues.Clear();
        }
    }

#endif
}
